#!/usr/bin/env python
import asyncio
import glob
import os
import subprocess
import sys
from contextlib import contextmanager
from importlib import import_module
from pathlib import Path
from typing import Generator, List

import typer
import yaml

DEFAULT_ROOT_PATH = os.environ.get(
    "PREFECT_INSTALL_PATH", "~/src/github.com/PrefectHQ/prefect"
)
DEFAULT_ROOT = Path(DEFAULT_ROOT_PATH).expanduser()
COLLECTIONS_DIR = DEFAULT_ROOT / "src" / "integrations"

app = typer.Typer(help="Utilities for managing Prefect collections en masse")


def prefect_collections() -> List[str]:
    entries = []

    for file in glob.glob("docs/integrations/catalog/*.yaml"):
        if file.lower().endswith("/template.yaml"):
            continue

        with open(file) as f:
            entries.append(yaml.safe_load(f))

    entries = [e for e in entries if e["author"] == "Prefect"]
    return [e["collectionName"] for e in entries]


def collection_paths() -> Generator[Path, None, None]:
    for collection in prefect_collections():
        yield COLLECTIONS_DIR / collection


@contextmanager
def _default_profile() -> Generator[None, None, None]:
    import prefect.context

    original = prefect.context.get_settings_context().profile.name
    if original == "default":
        yield
        return

    subprocess.check_call(["prefect", "profile", "use", "default"])
    try:
        yield
    finally:
        subprocess.check_call(["prefect", "profile", "use", original])


@app.command()
def test():
    """Runs tests on all collections"""
    with _default_profile():
        for collection_path in collection_paths():
            subprocess.check_call(["pytest", "tests"], cwd=str(collection_path))


@app.command()
def list_collections():
    """Lists all Prefect-managed collections"""
    for collection in prefect_collections():
        print(collection)


@app.command()
def run_function(function_path: str):
    """Runs the given Python function (specified as a module path) on all collections.
    The Python function must be importable from the current directory. For example:

        collections-manager run-function my_module.my_nested_module:my_function
    """
    module_name, _, function_name = function_path.partition(":")
    if not module_name or not function_name:
        raise typer.BadParameter(
            "function_name must be specified as a module path followed by a colon and "
            "a function name, like my_module.my_nested_module:my_function"
        )

    sys.path.insert(0, ".")

    module = import_module(module_name)
    function = getattr(module, function_name)

    for collection_path in collection_paths():
        if asyncio.iscoroutinefunction(function):
            asyncio.run(function(collection_path))
        else:
            function(collection_path)


if __name__ == "__main__":
    app()
